package flink_p1

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}

object FlinkTest_02_chain {


  def main(args: Array[String]): Unit = {


    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    val dstream: DataStream[String] = env.socketTextStream("127.0.0.1", 8889)

    val ds2: DataStream[String] = dstream.flatMap(_.split(",")).setParallelism(2)
    val ds3: DataStream[(String, Int)] = ds2.map((_, 1)).setParallelism(3)

    val resDtream: DataStream[(String, Int)] = ds3.keyBy(0).sum(1)

    resDtream.print()


    env.execute("flink test app")

  }


}
